#include "config.h"

#include <sys/types.h>
#include <sys/wait.h>
#include <sys/un.h>
#include <errno.h>
#include <pthread.h>
#include <getopt.h>

#define DBG_SUBSYS S_LIBINTERFACE

#include <pthread.h>

#include "../../storage/controller/volume_ctl.h"
#include "get_version.h"
#include "mem_cache.h"
#include "core.h"
#include "corenet.h"
#include "job_dock.h"
#include "sock_unix.h"
#include "../../ynet/sock/sock_tcp.h"
#include "../../ynet/net/net_events.h"
#include "lich_sheepdog.h"
#include "stor_root.h"
#include "sd_map.h"
#include "lichstor.h"
#include "analysis.h"
#include "dbg.h"

#define __TYPE_UNIX__ 1
#define __TYPE_TCP__ 2


#define USE_DIRECT_IO 0

typedef struct sheepdog_session {
        fileid_t id;
        int sd;
        int vid;
        uint32_t addr;
        sockid_t sockid;
        int running;
        int type;

#if USE_DIRECT_IO
        mcache_entry_t *entry;
#endif

        char export_name[128];
} sheepdog_session_t;

typedef struct {
        SheepdogReq req;
        sockid_t sockid;
        buffer_t buf;
        void *ctx;
} sd_request_t;

static int __tcp_listen_sd__;
static int __unix_listen_sd__;
static struct sockaddr_un __unix_listen_addr__;

static int __sheepdog_reply_errno1(int sd, int err, const SheepdogReq *req);

static int __read(int sd, char *buf, int size)
{
        int ret, toread, retry = 0;

retry:
        ret = ioctl(sd, FIONREAD, &toread);
        if (ret < 0){
                ret = errno;
                GOTO(err_ret, ret);
        }

        if (toread == 0) {
                ret = ECONNRESET;
                GOTO(err_ret, ret);
        }

        if (toread < size) {
                usleep(100 * 1000);
                retry++;
                if (retry < 10) {
                        DWARN("got %u need %u\n", toread, size);
                }

                goto retry;
        }

        ret = _read(sd, buf, size);
        if(ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        return ret;
err_ret:
        return -ret;
}

static void __sheepdog_session_check(void *ctx)
{
#if USE_DIRECT_IO
        int ret;
        sheepdog_session_t *session = ctx;
        mcache_entry_t *entry;

        DINFO("session check\n");

        if (session->entry == NULL) {
                DINFO("check %s localize\n", session->export_name);

                ret = volume_ctl_get(&session->id, &entry);
                if (unlikely(ret)) {
                        if (ret == EREMCHG) {
                                DWARN("%s not localized\n", session->export_name);
                        } else {
                                DWARN("%s check fail %u %s\n", session->export_name, ret, strerror(ret));
                        }
                } else {
                        session->entry = entry;
                        DINFO("%s localized\n", session->export_name);
                }
        } else {
                DINFO("recheck %s localize\n", session->export_name);

                ret = volume_ctl_get(&session->id, &entry);
                if (unlikely(ret)) {
                        if (ret == EREMCHG) {
                                DWARN("release %s localize\n", session->export_name);
                                if (session->entry) {
                                        mcache_release(session->entry);
                                        session->entry = NULL;
                                }
                        } else {
                                DWARN("%s check fail %u %s\n", session->export_name, ret, strerror(ret));
                        }
                } else {
                        YASSERT(session->entry == entry);
                        mcache_release(entry);
                        DINFO("check %s localize ok\n", session->export_name);
                }
        }
#else
        (void) ctx;
#endif
}

static int sheepdog_session_alloc(sheepdog_session_t **_se, int fd, int type)
{
        int ret;
        void *ptr;
        sheepdog_session_t *se;

        ret = ymalloc((void **)&ptr, sizeof(sheepdog_session_t));
        if(ret)
                GOTO(err_ret, ret);

        se = (sheepdog_session_t *)ptr;
        memset(se, 0x0, sizeof(*se));
        se->sd = fd;
        se->type = type;

        *_se = se;

        return 0;
err_ret:
        return ret;
}

inline static void __sheepdog_close(void *arg)
{
        sheepdog_session_t *se = arg;

        if (se->running) {
                DWARN("running %u\n", se->running);
                EXIT(EAGAIN);
        }

#if USE_DIRECT_IO
        if (se->entry) {
                mcache_release(se->entry);
                se->entry = NULL;
        }
#endif

        DINFO("session %s/"CHKID_FORMAT" closed\n", se->export_name,
              CHKID_ARG(&se->id));

        yfree((void **)&se);
}

static int __sheepdog_mkvol(const char *file, uint64_t size)
{
        int ret, retry = 0;
        char name[MAX_NAME_LEN];
        fileid_t parentid, fileid;

retry0:
        ret = stor_splitpath("default", file, &parentid, name);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry0, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

retry1:
        ret = stor_mkvol(&parentid, name, NULL, &fileid);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry1, retry, 50, (100 * 1000));
                } else if (ret == EEXIST) {
                        //nothing todo
                        goto err_ret;
                } else
                        GOTO(err_ret, ret);
        }

        if (size) {
        retry2:
                ret = stor_truncate("default", &fileid, size);
                if (unlikely(ret)) {
                        if (ret == EAGAIN) {
                                USLEEP_RETRY(err_ret, ret, retry2, retry, 50, (100 * 1000));
                        } else
                                GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

static int  __sheepdog_new_vdi(sheepdog_session_t *se, const SheepdogReq *_req)
{
        int ret;
        char buf[MAX_BUF_LEN], *name, path[MAX_PATH_LEN];
        const SheepdogVdiReq *req;
        SheepdogVdiRsp rep;

        req = (void *)_req;
        ret = __read(se->sd, buf, sizeof(*req) + SD_MAX_VDI_LEN);
        if(ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        req = (void *)buf;
        name = (void *)buf + sizeof(*req);

        DINFO("SHEEPDOG: req version %u, 0x%x, datalen %u, size %llu, name %s, snapshot %u\n",
              req->proto_ver, req->opcode, req->data_length, (LLU)req->vdi_size, name, req->snapid);

        if (req->snapid) {
                ret = ENOSPC;
                DWARN("snapshot is not supported\n");
                GOTO(err_ret, ret);
        }

        snprintf(path, MAX_NAME_LEN, "/%s/%s", gloconf.sheepdog_root, name);
        ret = __sheepdog_mkvol(path, req->vdi_size);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(&rep, 0x0, sizeof(rep));
        rep.proto_ver = SD_PROTO_VER;
        rep.opcode = req->opcode;
        rep.data_length = sizeof(rep);
        rep.id = req->id;
        rep.result = SD_RES_SUCCESS;

        ret = _write(se->sd, &rep, sizeof(rep));
        if (ret < 0){
                ret = -ret;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        __sheepdog_reply_errno1(se->sd, ret, _req);
        return ret;
}

static int __sheepdog_build_inode(uint32_t vid, SheepdogInode *inode)
{
        int ret, retry = 0, i, chknum;
        struct stat stbuf;
        fileid_t fileid;

        ret = sd_map_get(vid, &fileid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

retry:
        ret = stor_getattr("default", &fileid, &stbuf);
        if (unlikely(ret)) {
                USLEEP_RETRY(err_ret, ret, retry, retry, 100, (100 * 1000));
        }

        memset(inode, 0x0, sizeof(*inode));
        inode->ctime = stbuf.st_mtime;
        inode->vdi_size = stbuf.st_size;
        inode->nr_copies = 2;
        inode->block_size_shift = SD_DEFAULT_BLOCK_SIZE_SHIFT;
        inode->vdi_id = vid;

        chknum =  stbuf.st_size / (1 << SD_DEFAULT_BLOCK_SIZE_SHIFT);
        if (stbuf.st_size % (1 << SD_DEFAULT_BLOCK_SIZE_SHIFT))
                chknum++;

        for (i = 0; i < chknum; i++) {
                inode->data_vdi_id[i] = vid;
        }

        return 0;
err_ret:
        return ret;
}

static int  __sheepdog_lock_vdi(sheepdog_session_t *se, const SheepdogReq *_req)
{
        int ret, retry = 0, vid;
        char buf[MAX_BUF_LEN], *name, path[MAX_PATH_LEN];
        const SheepdogVdiReq *req;
        SheepdogVdiRsp rep;
        fileid_t fileid;

        req = (void *)_req;
        ret = __read(se->sd, buf, sizeof(*req) + SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN);
        if(ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        req = (void *)buf;
        name = (void *)buf + sizeof(*req);

        DINFO("SHEEPDOG: req version %u, 0x%x, datalen %u, size %llu, name %s, real %u\n",
              req->proto_ver, req->opcode, req->data_length, (LLU)req->vdi_size, name, ret);

        snprintf(path, MAX_NAME_LEN, "/%s/%s", gloconf.sheepdog_root, name);

retry:
        ret = stor_lookup1("default", path, &fileid);
        if (unlikely(ret)) {
                ret = _errno(ret);
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 100, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        ret = sd_map_open(&vid, &fileid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(vid);

        se->vid = vid;
        memset(&rep, 0x0, sizeof(rep));
        rep.proto_ver = SD_PROTO_VER;
        rep.opcode = req->opcode;
        rep.data_length = sizeof(rep);
        rep.result = SD_RES_SUCCESS;
        rep.vdi_id = vid;
        rep.id = req->id;

        DINFO("vid %u\n", vid);

        ret = _write(se->sd, &rep, sizeof(rep));
        if (ret < 0){
                ret = -ret;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        __sheepdog_reply_errno1(se->sd, ret, _req);
        return ret;
}

static int  __sheepdog_get_cluster_default(sheepdog_session_t *se, const SheepdogReq *_req)
{
        int ret;
        char buf[MAX_BUF_LEN];
        const SheepdogVdiReq *req;
        SheepdogClusterRsp rep;

        req = (void *)_req;

        ret = __read(se->sd, buf, sizeof(*req));
        if(ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        req = (void *)buf;

        DINFO("SHEEPDOG: req version %u, 0x%x, datalen %u, realsize %u\n",
              req->proto_ver, req->opcode, req->data_length, ret);

        memset(&rep, 0x0, sizeof(rep));
        rep.proto_ver = SD_PROTO_VER;
        rep.opcode = req->opcode;
        rep.data_length = sizeof(rep);
        rep.result = SD_RES_SUCCESS;
        rep.nr_copies = 2;
        rep.block_size_shift = SD_DEFAULT_BLOCK_SIZE_SHIFT;
        rep.id = req->id;

        ret = _write(se->sd, &rep, sizeof(rep));
        if (ret < 0){
                ret = -ret;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int  __sheepdog_read_vdis(sheepdog_session_t *se, const SheepdogReq *_req)
{
        int ret;
        char buf[MAX_BUF_LEN];
        const SheepdogReq *req;
        SheepdogRsp rep;
        void *ptr;

        req = (void *)_req;

        ret = __read(se->sd, buf, sizeof(*req));
        if(ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        req = (void *)buf;

        DINFO("SHEEPDOG: req version %u, 0x%x, datalen %u, realsize %u\n",
              req->proto_ver, req->opcode, req->data_length, ret);

        memset(&rep, 0x0, sizeof(rep));
        rep.proto_ver = SD_PROTO_VER;
        rep.opcode = req->opcode;
        rep.data_length = sizeof(rep);
        rep.result = SD_RES_SUCCESS;
        rep.id = req->id;

        ret = _write(se->sd, &rep, sizeof(rep));
        if (ret < 0){
                ret = -ret;
                GOTO(err_ret, ret);
        }

        ret = ymalloc((void **)&ptr, req->data_length);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(ptr, 0x0, req->data_length);
        ret = _write(se->sd, ptr, req->data_length);
        if (ret < 0){
                ret = -ret;
                GOTO(err_free, ret);
        }

        yfree((void **)&ptr);

        return 0;
err_free:
        yfree((void **)&ptr);
err_ret:
        return ret;
}

static int __sheepdog_read_inode(const SheepdogObjReq *req, SheepdogRsp *rep, buffer_t *buf)
{
        int ret, vid;
        SheepdogInode *inode;

        vid = (req->oid & (~VDI_BIT) & ((uint64_t)UINT32_MAX << VDI_SPACE_SHIFT)) >> VDI_SPACE_SHIFT;

        DINFO("SHEEPDOG: read inode req version %u, 0x%x, size %u, offset %llu, vid %u\n",
              req->proto_ver, req->opcode, req->data_length, (LLU)req->offset, vid);

        memset(rep, 0x0, sizeof(*rep));
        rep->proto_ver = SD_PROTO_VER;
        rep->opcode = req->opcode;
        rep->data_length = sizeof(SheepdogInode);
        rep->result = SD_RES_SUCCESS;
        rep->id = req->id;

        ret = ymalloc((void **)&inode, sizeof(*inode));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __sheepdog_build_inode(vid, inode);
        if (unlikely(ret))
                GOTO(err_free, ret);

        mbuffer_copy(buf, (void *)inode, sizeof(*inode));

        return 0;
err_free:
        mbuffer_copy(buf, (void *)inode, sizeof(*inode));
err_ret:
        return ret;
}

static int __sheepdog_read_raw__(sheepdog_session_t *se, const fileid_t *fileid,
                                 buffer_t *buf, uint32_t size, uint64_t offset)
{
        int ret, retry = 0;
        time_t begin = gettime();

retry:
#if USE_DIRECT_IO
        if (se->entry) {
                ret = volume_ctl_read_direct(se->entry, buf, size, offset, 1);
        } else {
                ret = stor_read("default", fileid, buf, size, offset);
        }
#else
        (void) se;

        ret = stor_read("default", fileid, buf, size, offset);
#endif
        if (unlikely(ret)) {
#if USE_DIRECT_IO
                if ((ret == EREMCHG  || ret == ESTALE) && se->entry) {
                        if (se->entry) {
                                mcache_release(se->entry);
                                se->entry = NULL;
                        }
                        DWARN(CHKID_FORMAT" moved\n", CHKID_ARG(fileid));
                        goto retry;
                }
#endif

                ret = _errno(ret);
                if (retry < 100 && (ret == EAGAIN || ret == ENOSPC)
                    && ((gettime() - begin) < 180)) {
                        if (retry > 10) {
                                DINFO("read "CHKID_FORMAT" (%llu, %llu),"
                                      " ret (%d) %s, need retry %u\n",
                                      CHKID_ARG(fileid),
                                      (LLU)offset, (LLU)size,
                                      ret, strerror(ret), retry);
                        }

                        retry++;
                        schedule_sleep("sd_read", 1000 * 1000);
                        goto retry;
                } else {
                        DWARN("read "CHKID_FORMAT" (%llu, %llu),"
                              " ret (%d) %s, need retry %u\n", CHKID_ARG(fileid),
                              (LLU)offset, (LLU)size, ret, strerror(ret), retry);
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

static int __sheepdog_read_raw(sheepdog_session_t *se, const SheepdogObjReq *req,
                               SheepdogRsp *rep, buffer_t *buf)
{
        int ret, vid;
        fileid_t fileid;
        uint64_t offset, idx;

        vid = (req->oid & ((uint64_t)UINT32_MAX << VDI_SPACE_SHIFT)) >> VDI_SPACE_SHIFT;
        idx = (uint64_t)UINT32_MAX & req->oid;

        DBUG("SHEEPDOG: read raw req version %u, 0x%x, size %u, offset %llu, vid %u, idx %llu\n",
              req->proto_ver, req->opcode, req->data_length, (LLU)req->offset, vid, (LLU)idx);

        ret = sd_map_get(vid, &fileid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        offset = idx * (1 << SD_DEFAULT_BLOCK_SIZE_SHIFT) + req->offset;

        ret = __sheepdog_read_raw__(se, &fileid, buf, req->data_length, offset);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        memset(rep, 0x0, sizeof(*rep));
        rep->proto_ver = SD_PROTO_VER;
        rep->opcode = req->opcode;
        rep->data_length = buf->len;
        rep->id = req->id;
        rep->result = SD_RES_SUCCESS;

        return 0;
err_ret:
        return ret;
}

static int  __sheepdog_read_obj__(sheepdog_session_t *se, const SheepdogObjReq *req,
                                  SheepdogRsp *rep, buffer_t *buf)
{
        int ret;

        if (req->oid & VDI_BIT) {
                ret = __sheepdog_read_inode(req, rep, buf);
                if(ret)
                        GOTO(err_ret, ret);
        } else {
                ret = __sheepdog_read_raw(se, req, rep, buf);
                if(ret)
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __sheepdog_update_inode(const SheepdogInode *old, const SheepdogInode *new)
{
        int ret;
        uint64_t i;
        fileid_t fileid;

        ret = sd_map_get(old->vdi_id, &fileid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (strcmp(old->name, new->name)) {
                DWARN("update name from %s to %s\n", old->name, new->name);
                UNIMPLEMENTED(__DUMP__);
        }

        if (strcmp(old->tag, new->tag)) {
                DWARN("update tag from %s to %s\n", old->tag, new->tag);
                UNIMPLEMENTED(__DUMP__);
        }

        if (old->ctime != new->ctime) {
                DWARN("update ctime from %llu to %llu\n", (LLU)old->ctime, (LLU)new->ctime);
                UNIMPLEMENTED(__DUMP__);
        }

        if (old->snap_ctime != new->snap_ctime) {
                DWARN("update snap_ctime from %llu to %llu\n",
                      (LLU)old->snap_ctime, (LLU)new->snap_ctime);
                UNIMPLEMENTED(__DUMP__);
        }

        if (old->vm_clock_nsec != new->vm_clock_nsec) {
                DWARN("update vm_clock_nsec from %llu to %llu\n",
                      (LLU)old->vm_clock_nsec, (LLU)new->vm_clock_nsec);
                UNIMPLEMENTED(__DUMP__);
        }

        if (old->vdi_size != new->vdi_size) {
                DWARN("update vdi_size from %llu to %llu\n", (LLU)old->vdi_size, (LLU)new->vdi_size);

                ret = stor_truncate("default",&fileid, new->vdi_size);
                if(ret)
                        GOTO(err_ret, ret);
        }

        if (old->vm_state_size != new->vm_state_size) {
                DWARN("update vm_state_size from %llu to %llu\n",
                      (LLU)old->vm_state_size, (LLU)new->vm_state_size);
                UNIMPLEMENTED(__DUMP__);
        }

        if (old->copy_policy != new->copy_policy) {
                DWARN("update copy_policy from %u to %u\n", old->copy_policy, new->copy_policy);
                UNIMPLEMENTED(__DUMP__);
        }

        if (old->nr_copies != new->nr_copies) {
                DWARN("update nr_copies from %u to %u\n", old->nr_copies, new->nr_copies);
                UNIMPLEMENTED(__DUMP__);
        }

        if (old->block_size_shift != new->block_size_shift) {
                DWARN("update block_size_shift from %u to %u\n",
                      old->block_size_shift, new->block_size_shift);
                UNIMPLEMENTED(__DUMP__);
        }

        if (old->snap_id != new->snap_id) {
                DWARN("update snap_id from %u to %u\n", old->snap_id, new->snap_id);
                UNIMPLEMENTED(__DUMP__);
        }

        if (old->vdi_id != new->vdi_id) {
                DWARN("update vdi_id from %u to %u\n", old->vdi_id, new->vdi_id);
                UNIMPLEMENTED(__DUMP__);
        }

        if (old->parent_vdi_id != new->parent_vdi_id) {
                DWARN("update parent_vdi_id from %u to %u\n", old->parent_vdi_id, new->parent_vdi_id);
                UNIMPLEMENTED(__DUMP__);
        }

        for (i = 0; i < MAX_CHILDREN; i++) {
                if (old->child_vdi_id[i] != new->child_vdi_id[i]) {
                        DWARN("update child_vdi_id[%llu] from %llu to %llu\n", (LLU)i,
                              (LLU)old->child_vdi_id[i], (LLU)new->child_vdi_id[i]);
                        UNIMPLEMENTED(__DUMP__);
                }
        }

        for (i = 0; i < MAX_DATA_OBJS; i++) {
                if (old->data_vdi_id[i] != new->data_vdi_id[i]) {
                        DWARN("update data_vdi_id[%llu] from %llu to %llu\n", (LLU)i,
                              (LLU)old->data_vdi_id[i], (LLU)new->data_vdi_id[i]);
                        UNIMPLEMENTED(__DUMP__);
                }
        }

        return 0;
err_ret:
        return ret;
}


static int __sheepdog_write_inode(const SheepdogObjReq *req, SheepdogRsp *rep, const buffer_t *buf)
{
        int ret, vid, idx;
        SheepdogInode *inode1, *inode2;
        void *tmp;

        (void) buf;

        vid = (req->oid & (~VDI_BIT) & ((uint64_t)UINT32_MAX << VDI_SPACE_SHIFT)) >> VDI_SPACE_SHIFT;
        idx = req->oid & (~VDI_BIT) & (uint64_t)UINT32_MAX;

        DINFO("SHEEPDOG: write inode req version %u, 0x%x, size %u, offset %llu, vid %u, idx %u\n",
              req->proto_ver, req->opcode, req->data_length, (LLU)req->offset, vid, idx);

        YASSERT(idx == 0);
        YASSERT(req->data_length == buf->len);
        YASSERT(req->data_length + req->offset < sizeof(*inode1));

        ret = ymalloc((void **)&tmp, sizeof(*inode1) * 2 + buf->len);
        if(ret)
                GOTO(err_ret, ret);

        inode1 = tmp + buf->len;
        inode2 = (void *)inode1 + sizeof(*inode1);

        ret = __sheepdog_build_inode(vid, inode1);
        if(ret)
                GOTO(err_free, ret);

        mbuffer_get(buf, tmp, buf->len);
        memcpy(inode2, inode1, sizeof(*inode1));
        memcpy((void *)inode2 + req->offset, tmp, buf->len);

        ret = __sheepdog_update_inode(inode1 ,inode2);
        if(ret)
                GOTO(err_free, ret);

        memset(rep, 0x0, sizeof(*rep));
        rep->proto_ver = SD_PROTO_VER;
        rep->opcode = req->opcode;
        //rep->data_length = sizeof(SheepdogInode);
        rep->result = SD_RES_SUCCESS;
        rep->id = req->id;

        yfree((void **)&tmp);

        return 0;
err_free:
        yfree((void **)&tmp);
err_ret:
        return ret;
}

static int __sheepdog_write_raw__(sheepdog_session_t *se, const fileid_t *fileid,
                                  const buffer_t *buf, uint32_t size, uint64_t offset)
{
        int ret, retry = 0;
        time_t begin = gettime();

retry:
#if USE_DIRECT_IO
        if (se->entry) {
                ret = volume_ctl_write_direct(se->entry, buf, size, offset, 1);
        } else {
                ret = stor_write("default", fileid, buf, size, offset);
        }
#else
        (void) se;
        ret = stor_write("default", fileid, buf, size, offset);
#endif

        if (unlikely(ret)) {
#if USE_DIRECT_IO
                if ((ret == EREMCHG  || ret == ESTALE) && se->entry) {
                        if (se->entry) {
                                mcache_release(se->entry);
                                se->entry = NULL;
                        }
                        DWARN(CHKID_FORMAT" moved\n", CHKID_ARG(fileid));
                        goto retry;
                }
#endif

                ret = _errno(ret);
                if (retry < 100 && (ret == EAGAIN || ret == ENOSPC)
                    && ((gettime() - begin) < 180)) {
                        if (retry > 10) {
                                DINFO("write "CHKID_FORMAT" (%llu, %llu),"
                                      " ret (%d) %s, need retry %u\n",
                                      CHKID_ARG(fileid),
                                      (LLU)offset, (LLU)size,
                                      ret, strerror(ret), retry);
                        }

                        retry++;
                        schedule_sleep("sd_write", 1000 * 1000);
                        goto retry;
                } else {
                        DWARN("write "CHKID_FORMAT" (%llu, %llu),"
                              " ret (%d) %s, need retry %u\n",
                              CHKID_ARG(fileid),
                              (LLU)offset, (LLU)size,
                              ret, strerror(ret), retry);
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

static int __sheepdog_write_raw(sheepdog_session_t *se, const SheepdogObjReq *req,
                                SheepdogRsp *rep, const buffer_t *buf)
{
        int ret, vid;
        fileid_t fileid;
        uint64_t offset, idx;

        vid = (req->oid & ((uint64_t)UINT32_MAX << VDI_SPACE_SHIFT)) >> VDI_SPACE_SHIFT;
        idx = (uint64_t)UINT32_MAX & req->oid;

        DBUG("SHEEPDOG: write raw req version %u, 0x%x, size %u, offset %llu, vid %u, idx %llu\n",
              req->proto_ver, req->opcode, req->data_length, (LLU)req->offset, vid, (LLU)idx);

        ret = sd_map_get(vid, &fileid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        offset = idx * (1 << SD_DEFAULT_BLOCK_SIZE_SHIFT) + req->offset;

        ret = __sheepdog_write_raw__(se, &fileid, buf, req->data_length, offset);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        memset(rep, 0x0, sizeof(*rep));
        rep->proto_ver = SD_PROTO_VER;
        rep->opcode = req->opcode;
        rep->id = req->id;
        rep->result = SD_RES_SUCCESS;

        return 0;
err_ret:
        return ret;
}

static int  __sheepdog_write_obj__(sheepdog_session_t *se, const SheepdogObjReq *req,
                                   SheepdogRsp *rep, const buffer_t *buf)
{
        int ret;

        if ((req->oid & VDI_BIT)) {
                ret = __sheepdog_write_inode(req, rep, buf);
                if(ret)
                        GOTO(err_ret, ret);
        } else {
                ret = __sheepdog_write_raw(se, req, rep, buf);
                if(ret)
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int  __sheepdog_flush_vdi__(const SheepdogObjReq *req, SheepdogRsp *rep)
{

        DBUG("SHEEPDOG: req version %u, 0x%x, datalen %u\n",
              req->proto_ver, req->opcode, req->data_length);

        memset(rep, 0x0, sizeof(*rep));
        rep->proto_ver = SD_PROTO_VER;
        rep->opcode = req->opcode;
        rep->data_length = sizeof(rep);
        rep->result = SD_RES_SUCCESS;
        rep->id = req->id;

        return 0;
}

static int  __sheepdog_flush_vdi(sheepdog_session_t *se, const SheepdogReq *_req)
{
        int ret;
        char buf[MAX_BUF_LEN];
        const SheepdogObjReq *req;
        SheepdogRsp rep;

        req = (void *)_req;

        ret = __read(se->sd, buf, sizeof(*req));
        if(ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        req = (void *)buf;

        ret = __sheepdog_flush_vdi__(req, &rep);
        if(ret)
                GOTO(err_ret, ret);

        ret = _write(se->sd, &rep, sizeof(rep));
        if (ret < 0){
                ret = -ret;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        __sheepdog_reply_errno1(se->sd, ret, _req);
        return ret;
}

static int  __sheepdog_release_vdi__(const SheepdogVdiReq *req, SheepdogRsp *rep)
{
        int ret;

        DINFO("SHEEPDOG: req version %u, 0x%x, datalen %u, size %llu\n",
              req->proto_ver, req->opcode, req->data_length, (LLU)req->vdi_size);

        ret = sd_map_close(req->base_vdi_id);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(rep, 0x0, sizeof(*rep));
        rep->proto_ver = SD_PROTO_VER;
        rep->opcode = req->opcode;
        rep->data_length = sizeof(rep);
        rep->result = SD_RES_SUCCESS;
        rep->id = req->id;

        return 0;
err_ret:
        return ret;
}

static int  __sheepdog_release_vdi(sheepdog_session_t *se, const SheepdogReq *_req)
{
        int ret;
        char buf[MAX_BUF_LEN];
        const SheepdogVdiReq *req;
        SheepdogRsp rep;

        req = (void *)_req;
        ret = __read(se->sd, buf, sizeof(*req) + _req->data_length);
        if(ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        req = (void *)buf;
        ret = __sheepdog_release_vdi__(req, &rep);
        if(ret)
                GOTO(err_ret, ret);

        ret = _write(se->sd, &rep, sizeof(rep));
        if (ret < 0){
                ret = -ret;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        __sheepdog_reply_errno1(se->sd, ret, _req);
        return ret;
}

static int __sheepdog_reply(const sockid_t *sockid, const SheepdogRsp *rep, buffer_t *_buf)
{
        buffer_t buf;

        mbuffer_init(&buf, 0);
        mbuffer_copy(&buf, (void *)rep, sizeof(*rep));

        if (_buf) {
                mbuffer_merge(&buf, _buf);
        }

        corenet_tcp_send(sockid, &buf, 0);

        DBUG("__reply__\n");

        return 0;
}

static void __sheepdog_reply_errno__(int err, const SheepdogReq *req, SheepdogRsp *rep)
{
        memset(rep, 0x0, sizeof(*rep));
        rep->proto_ver = SD_PROTO_VER;
        rep->opcode = req->opcode;
        rep->id = req->id;

        if (err == EEXIST)
                rep->result = SD_RES_VDI_EXIST;
        else if (err == ENOSPC)
                rep->result = SD_RES_NO_SPACE;
        else if (err == ENOENT)
                rep->result = SD_RES_NO_VDI;
        else
                rep->result = SD_RES_UNKNOWN;
}

static int __sheepdog_reply_errno(const sockid_t *sockid, int err, const SheepdogReq *req)
{
        SheepdogRsp rep;

        __sheepdog_reply_errno__(err, req, &rep);

        __sheepdog_reply(sockid, &rep, NULL);

        return 0;
}

static int __sheepdog_reply_errno1(int sd, int err, const SheepdogReq *req)
{
        int ret;
        SheepdogRsp rep;

        __sheepdog_reply_errno__(err, req, &rep);

        ret = _write(sd, &rep, sizeof(rep));
        if (ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}


static void __sheepdog_func(void *arg)
{
        int ret;
        SheepdogReq *req;
        sd_request_t *sd_request;
        buffer_t *buf, tmp;
        SheepdogRsp rep;
        sheepdog_session_t *se;

        sd_request = arg;
        req = &sd_request->req;
        se = sd_request->ctx;

        se->running++;

        buf = NULL;
        switch (req->opcode) {
        case SD_OP_GET_CLUSTER_DEFAULT:
                UNIMPLEMENTED(__DUMP__);
                        
                break;
        case SD_OP_NEW_VDI:
                UNIMPLEMENTED(__DUMP__);
                        
                break;
        case SD_OP_LOCK_VDI:
                UNIMPLEMENTED(__DUMP__);

                break;
        case SD_OP_READ_OBJ:
                mbuffer_init(&tmp, 0);
                ret = __sheepdog_read_obj__(se, (void *)&sd_request->req,
                                            &rep, &tmp);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                buf = &tmp;

                break;
        case SD_OP_WRITE_OBJ:
        case SD_OP_CREATE_AND_WRITE_OBJ:
                ret = __sheepdog_write_obj__(se, (void *)&sd_request->req,
                                             &rep, &sd_request->buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                break;
        case SD_OP_FLUSH_VDI:
                ret = __sheepdog_flush_vdi__((void *)&sd_request->req, &rep);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                break;
        case SD_OP_RELEASE_VDI:
                ret = __sheepdog_release_vdi__((void *)&sd_request->req, &rep);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
                        
                break;
        default:
                DERROR("bad msgtype 0x%x\n", req->opcode);
                YASSERT(0);
        }

        __sheepdog_reply(&se->sockid, &rep, buf);

        mbuffer_free(&sd_request->buf);
        mem_cache_free(MEM_CACHE_128, sd_request);

        if (buf)
                mbuffer_free(buf);

        se->running--;

        return;
err_ret:
        __sheepdog_reply_errno(&se->sockid, ret, &sd_request->req);

        mbuffer_free(&sd_request->buf);
        mem_cache_free(MEM_CACHE_128, sd_request);

        if (buf)
                mbuffer_free(buf);

        se->running--;
}


static int __sheepdog_newtask(void *ctx, void *buf, int *_count)
{
        int ret, count = 0, size = 0;
        SheepdogReq req, tmp;
        sd_request_t *sd_request;
        const char *name = NULL;
        buffer_t *_buf = buf;

        DBUG("__request__\n");

        while (_buf->len >= sizeof(req)) {
                mbuffer_get(_buf, &req, sizeof(req));

                switch (req.opcode) {
                case SD_OP_GET_CLUSTER_DEFAULT:
                        size = 0;
                        name = "sd_default";

                        break;
                case SD_OP_NEW_VDI:
                        size = SD_MAX_VDI_LEN;
                        name = "sd_new_vdi";

                        break;
                case SD_OP_LOCK_VDI:
                        size = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
                        name = "sd_lock";

                        break;

                case SD_OP_READ_OBJ:
                        size = 0;
                        name = "sd_read";

                        break;
                case SD_OP_WRITE_OBJ:
                case SD_OP_CREATE_AND_WRITE_OBJ:
                        if (req.data_length + sizeof(req) > _buf->len) {
                                DBUG("cmd %u size %u len %u\n", req.opcode, req.data_length, _buf->len);
                                goto out;
                        }

                        size = req.data_length;
                        name = "sd_write";

                        break;
                case SD_OP_FLUSH_VDI:
                        size = 0;
                        name = "sd_flush";

                        break;
                                                
                case SD_OP_RELEASE_VDI:
                        size = req.data_length;
                        name = "sd_release";

                        break;
                default:
                        DERROR("bad msgtype 0x%x\n", req.opcode);
                        YASSERT(0);
                }

                ret = mbuffer_popmsg(_buf, &tmp, sizeof(tmp));
                YASSERT(ret == 0);

                
#ifdef HAVE_STATIC_ASSERT
                static_assert(sizeof(*sd_request)  < sizeof(mem_cache128_t), "sd_request_t");
#endif

                sd_request = mem_cache_calloc(MEM_CACHE_128, 0);
                YASSERT(sd_request);

                sd_request->req = req;
                sd_request->ctx = ctx;
                mbuffer_init(&sd_request->buf, 0);

                if (size)
                        mbuffer_pop(_buf, &sd_request->buf, size);

                schedule_task_new(name, __sheepdog_func, sd_request, -1);
                count++;
        }

        if (count) {
                DBUG("new task %u\n", count);
        }

out:
        *_count = count;

        return 0;
}

static int __sheepdog_dispatch(sheepdog_session_t *se, const SheepdogObjReq *req)
{
        int ret, hash;
        uint32_t vid;

        if (se->type == __TYPE_UNIX__) {
                ret = sock_unix_tuning(se->sd);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = tcp_sock_tuning(se->sd, 1, 1);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        if (req->oid & VDI_BIT) {
                vid = (req->oid & (~VDI_BIT) & ((uint64_t)UINT32_MAX << VDI_SPACE_SHIFT)) >> VDI_SPACE_SHIFT;
        } else {
                vid = (req->oid & ((uint64_t)UINT32_MAX << VDI_SPACE_SHIFT)) >> VDI_SPACE_SHIFT;
        }

        ret = sd_map_get(vid, &se->id);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (se->type == __TYPE_UNIX__) {
                ret = stor_localize("default", &se->id);
                if (unlikely(ret)) {
                        if (ret == ENOSPC) {
                                //nothing todo;
                        } else {
                                GOTO(err_ret, ret);
                        }
                }

#if USE_DIRECT_IO
                ret = volume_ctl_get(&se->id, &se->entry);
                if (unlikely(ret)) {
                        if (ret == EREMCHG) {
                                DWARN("%s not localized\n", se->export_name);
                        } else {
                                DWARN("%s check fail %u %s\n", se->export_name, ret, strerror(ret));
                        }
                } else {
                        DINFO("%s localized\n", se->export_name);
                }
#endif
        }

        hash = core_hash(&se->id);

        se->sockid.sd = se->sd;
        se->sockid.seq = _random();
        se->sockid.type = SOCKID_CORENET;
        se->sockid.addr = se->addr;

        ret = core_attach(hash, &se->sockid, "sheepdog", se,
                          __sheepdog_newtask, __sheepdog_close, __sheepdog_session_check);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __sheepdog_exec(sheepdog_session_t *se, const SheepdogReq *req)
{
        int ret;

        switch (req->opcode) {
        case SD_OP_GET_CLUSTER_DEFAULT:
                ret = __sheepdog_get_cluster_default(se, req);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                break;
        case SD_OP_READ_VDIS:
                ret = __sheepdog_read_vdis(se, req);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                break;
        case SD_OP_NEW_VDI:
                ret = __sheepdog_new_vdi(se, req);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                break;
        case SD_OP_LOCK_VDI:
        case SD_OP_GET_VDI_INFO:
                ret = __sheepdog_lock_vdi(se, req);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                break;
        case SD_OP_READ_OBJ:
        case SD_OP_WRITE_OBJ:
        case SD_OP_CREATE_AND_WRITE_OBJ:
                ret = __sheepdog_dispatch(se, (void *)req);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                DINFO("exit thread\n");
                pthread_exit(NULL);
                UNIMPLEMENTED(__DUMP__);

                break;
        case SD_OP_FLUSH_VDI:
                ret = __sheepdog_flush_vdi(se, req);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
                
                break;
        case SD_OP_RELEASE_VDI:
                ret = __sheepdog_release_vdi(se, req);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
                
                break;
        default:
                ret = EINVAL;
                DWARN("op 0x%x\n", req->opcode);
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static void *__sheepdog_worker(void *_arg)
{
        int ret, sd, toread;
        sheepdog_session_t *se = _arg;
        char buf[MAX_BUF_LEN];
        SheepdogReq *req;

        sd = se->sd;

        req = (void *)buf;
        while (1) {
                ret = sock_poll_sd(sd, 1000 * 1000, POLLIN);
                if (unlikely(ret)) {
                        if (ret == ETIME)
                                continue;
                        GOTO(err_ret, ret);
                }

                ret = ioctl(sd, FIONREAD, &toread);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (toread == 0) {
                        ret = ECONNRESET;
                        GOTO(err_ret, ret);
                }
                
                if (toread < (int)sizeof(*req)) {
                        DWARN("got %u, need %u\n", toread, (int)sizeof(*req));
                        continue;
                }

                ret = _recv(sd, buf, sizeof(*req), MSG_PEEK);
                if (ret < 0) {
                        ret = -ret;
                        GOTO(err_ret, ret);
                }

                ret = __sheepdog_exec(se, req);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return NULL;
err_ret:
#if 0
        if (se->vid)
                sd_map_close(se->vid);
#endif

        close(se->sd);
        yfree((void **)&se);
        return NULL;
}


static int __sheepdog_accept()
{
        int ret, sd;
        sheepdog_session_t *se;
        struct sockaddr_in sin;
        socklen_t alen;
        pthread_t th;
        pthread_attr_t ta;

        _memset(&sin, 0, sizeof(sin));
        alen = sizeof(struct sockaddr_in);

        sd = accept(__tcp_listen_sd__, &sin, &alen);
        if (sd < 0 ) {
                ret = errno;
		GOTO(err_ret, ret);
        }

        ret = sheepdog_session_alloc(&se, sd, __TYPE_TCP__);
        if (unlikely(ret)) {
                GOTO(err_fd, ret);
        }

        se->addr = sin.sin_addr.s_addr;

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
 
        ret = pthread_create(&th, &ta, __sheepdog_worker, se);
        if (unlikely(ret))
                GOTO(err_free, ret);

        return 0;
err_free:
        yfree((void **)&se);
err_fd:
        close(sd);
err_ret:
        return ret;
}

static void *__sheepdog_passive__(void *_arg)
{
        int ret;

        (void) _arg;

        while (1) {
                ret = sock_poll_sd(__tcp_listen_sd__, 1000 * 1000, POLLIN);
                if (unlikely(ret)) {
                        if (ret == ETIMEDOUT || ret == ETIME)
                                continue;
                        else
                                GOTO(err_ret, ret);
                 }

                ret = __sheepdog_accept();
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return NULL;
err_ret:
        return NULL;
}

static int __sheepdog_unix_accept()
{
        int ret, sd;
        sheepdog_session_t *se;
        socklen_t alen;
        pthread_t th;
        pthread_attr_t ta;
        struct sockaddr_un sin;

        _memset(&sin, 0, sizeof(sin));
        alen = sizeof(struct sockaddr_un);

        sd = accept(__unix_listen_sd__, &sin, &alen);
        if (sd < 0 ) {
                ret = errno;
		GOTO(err_ret, ret);
        }

        ret = sheepdog_session_alloc(&se, sd, __TYPE_UNIX__);
        if (unlikely(ret)) {
                GOTO(err_fd, ret);
        }

        se->addr = 123;

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
 
        ret = pthread_create(&th, &ta, __sheepdog_worker, se);
        if (unlikely(ret))
                GOTO(err_free, ret);

        return 0;
err_free:
        yfree((void **)&se);
err_fd:
        close(sd);
err_ret:
        return ret;
}

static void *__sheepdog_unix_passive__(void *_arg)
{
        int ret;

        (void) _arg;

        while (1) {
                ret = sock_poll_sd(__unix_listen_sd__, 1000 * 1000, POLLIN);
                if (unlikely(ret)) {
                        if (ret == ETIMEDOUT || ret == ETIME)
                                continue;
                        else
                                GOTO(err_ret, ret);
                 }

                ret = __sheepdog_unix_accept();
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return NULL;
err_ret:
        return NULL;
}

static int __sheepdog_tcp_passive()
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;
        char port[MAX_BUF_LEN];

        snprintf(port, MAX_BUF_LEN, "%u", gloconf.sheepdog_port);
        ret = tcp_sock_hostlisten(&__tcp_listen_sd__, NULL, port,
                                  YNET_QLEN, YNET_RPC_BLOCK, 1);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
 
        ret = pthread_create(&th, &ta, __sheepdog_passive__, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __sheepdog_unix_passive()
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;

        ret = sock_unix_listen(gloconf.sheepdog_sock, &__unix_listen_sd__, &__unix_listen_addr__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("listen @ %s, sd %u\n", gloconf.sheepdog_sock, __unix_listen_sd__);

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
 
        ret = pthread_create(&th, &ta, __sheepdog_unix_passive__, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static void *__sheepdog_service(void *_arg)
{
        int ret;

        (void) _arg;

        while (1) {
                ret = __sheepdog_tcp_passive(0);
                if (unlikely(ret)) {
                        if (ret == EADDRINUSE) {
                                //DBUG("retry sheepdog\n");
                                sleep(2);
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }

                ret = __sheepdog_unix_passive(0);
                if (unlikely(ret)) {
                        if (ret == EADDRINUSE) {
                                //DBUG("retry sheepdog\n");
                                sleep(2);
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }

                DINFO("sheepdog service inited\n");

                break;
        }

        return NULL;
err_ret:
        return NULL;
}

int sheepdog_srv()
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;

        ret = sd_map_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
 
        ret = pthread_create(&th, &ta, __sheepdog_service, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);
       
        return 0;
err_ret:
        return ret;
}
